/*
 *  +----------------------------------------------------------------------
 *  | sophon [ A FAST GAME FRAMEWORK ]
 *  +----------------------------------------------------------------------
 *  | Copyright (c) 2023-2029 All rights reserved.
 *  +----------------------------------------------------------------------
 *  | Licensed ( http:www.apache.org/licenses/LICENSE-2.0 )
 *  +----------------------------------------------------------------------
 *  | Author: jqiris <1920624985@qq.com>
 *  +----------------------------------------------------------------------
 */

use std::{sync::Arc, time::Duration};

use anyhow::Result;
use futures::executor::block_on;
use nats::*;

use crate::prelude::join_path;
use crate::{configs::RpcConf, errors::*, logger::*, treaty::*};
use crate::{error, fatal, info};

use super::*;

pub async fn new_rpc_server(cfg: RpcConf, server: Option<Server>) -> Result<ServerRpc, NatsError> {
    if &cfg.use_type != "nats" {
        fatal!("not support rpc type: {}", &cfg.use_type);
        return Err(NatsError::NatsUseTypeError(cfg.use_type.to_owned()));
    }
    let mut options = Vec::<NatsRpcOption>::new();
    options.push(with_nats_prefix(cfg.prefix));
    options.push(with_nats_debug_msg(cfg.debug_msg));
    options.push(with_nats_dial_timeout(Duration::from_secs(
        cfg.dial_timeout,
    )));
    let mut auths = None;
    if let Some(user) = cfg.auth_user {
        match cfg.auth_passwd {
            Some(passwd) => {
                auths = Some((user, passwd));
            }
            None => {
                auths = Some((user, "".to_string()));
            }
        }
    }
    let rpc = NatsRpc::new(server, cfg.endpoints, auths, Some(options)).await;
    Ok(Arc::new(rpc))
}

type NatsRpcOption = Box<dyn FnMut(&mut NatsRpc)>;

pub fn with_nats_debug_msg(debug_msg: bool) -> NatsRpcOption {
    Box::new(move |r: &mut NatsRpc| {
        r.debug_msg = debug_msg;
    })
}

pub fn with_nats_prefix(prefix: String) -> NatsRpcOption {
    Box::new(move |r: &mut NatsRpc| {
        r.prefix = prefix.to_owned();
    })
}

pub fn with_nats_dial_timeout(timeout: Duration) -> NatsRpcOption {
    Box::new(move |r: &mut NatsRpc| {
        r.dial_timeout = timeout;
    })
}

pub struct NatsRpc {
    client: Connection,
    server: Option<Server>,
    debug_msg: bool,
    prefix: String,
    finder: Arc<Finder<String>>,
    dial_timeout: Duration,
}

impl NatsRpc {
    pub async fn new(
        server: Option<Server>,
        endpoints: Vec<String>,
        auths: Option<(String, String)>,
        options: Option<Vec<NatsRpcOption>>,
    ) -> Self {
        let urls = endpoints.join(",").into_server_list();
        let client = match auths {
            Some((user, passwd)) => nats::Options::with_user_pass(&user, &passwd)
                .connect(urls)
                .expect("nats connect failed"),
            None => nats::connect(urls).expect("nats connect failed"),
        };
        let finder = Finder::<String>::new().await;
        //finder subscribe event
        let mut handlers = Vec::new();
        let hander: ServerHander = finder.clone();
        handlers.push(hander);
        register_server_event_handlers(&mut handlers).await;
        //create nat rpc
        let mut r = NatsRpc {
            client,
            server,
            debug_msg: false,
            prefix: "nats_rpc".into(),
            finder,
            dial_timeout: Duration::from_secs(5),
        };
        if let Some(mut ops) = options {
            for option in ops.iter_mut() {
                option(&mut r);
            }
        }
        r
    }
    fn dial_timeout(&self, dial_timeout: Option<Duration>) -> Duration {
        if let Some(timeout) = dial_timeout {
            return timeout;
        }
        self.dial_timeout
    }
}

impl IServerRpc for NatsRpc {
    fn subscribe(&self, s: RssBuilder) -> Result<(), NatsError> {
        match &self.server {
            None => Err(NatsError::NatsServerNull()),
            Some(server) => {
                let debug_msg = self.debug_msg.clone();
                let sub_path = join_path(reg_server_item(&server), s.suffix);
                let sub = format!("{}/{}", self.prefix, sub_path);
                let res = match self.client.subscribe(&sub) {
                    Err(err) => Err(NatsError::NatsSubscribeError(err.to_string())),
                    Ok(subs) => {
                        subs.with_handler(move |msg| {
                            let callback = s.callback.clone();
                            deal_msg(msg, callback, debug_msg);
                            Ok(())
                        });
                        Ok(())
                    }
                };
                res
            }
        }
    }

    fn subscribe_broadcast(&self, s: RssBuilder) -> Result<(), NatsError> {
        let debug_msg = self.debug_msg.clone();
        let sub_path = join_path(s.server.server_type, s.suffix);
        let sub = format!("{}/{}", self.prefix, sub_path);
        let res = match self.client.subscribe(&sub) {
            Err(err) => Err(NatsError::NatsSubscribeError(err.to_string())),
            Ok(subs) => {
                subs.with_handler(move |msg| {
                    let callback = s.callback.clone();
                    deal_msg(msg, callback, debug_msg);
                    Ok(())
                });
                Ok(())
            }
        };
        res
    }

    fn queue_subscribe(&self, s: RssBuilder) -> Result<(), NatsError> {
        if self.server.is_none() {
            return Err(NatsError::NatsServerNull());
        }
        match &self.server {
            None => Err(NatsError::NatsServerNull()),
            Some(server) => {
                let debug_msg = self.debug_msg.clone();
                let sub_path = join_path(reg_server_queue(&server.server_type, &s.queue), s.suffix);
                let sub = format!("{}/{}", self.prefix, sub_path);
                let res = match self.client.subscribe(&sub) {
                    Err(err) => Err(NatsError::NatsSubscribeError(err.to_string())),
                    Ok(subs) => {
                        subs.with_handler(move |msg| {
                            let callback = s.callback.clone();
                            deal_msg(msg, callback, debug_msg);
                            Ok(())
                        });
                        Ok(())
                    }
                };
                res
            }
        }
    }
    fn publish(&self, s: ReqBuilder) -> Result<(), NatsError> {
        let mut data = Vec::new();
        if let Some(v) = s.req {
            data = v;
        }
        let server = s.server.unwrap();
        let sub_path = join_path(reg_server_item(&server), s.suffix);
        let sub = format!("{}/{}", self.prefix, sub_path);
        if let Err(err) = self.client.publish(&sub, data) {
            return Err(NatsError::NatsPublishError(err.to_string()));
        }
        Ok(())
    }

    fn publish_broadcast(&self, s: ReqBuilder) -> Result<(), NatsError> {
        let mut data = Vec::new();
        if let Some(v) = s.req {
            data = v;
        }
        let mut server_type = &s.server_type;
        if let Some(srv) = &s.server {
            if server_type.len() < 1 {
                server_type = &srv.server_type;
            }
        }
        let sub_path = join_path(server_type.to_owned(), s.suffix);
        let sub = format!("{}/{}", self.prefix, sub_path);
        if let Err(err) = self.client.publish(&sub, data) {
            return Err(NatsError::NatsPublishError(err.to_string()));
        }
        Ok(())
    }

    fn queue_publish(&self, s: ReqBuilder) -> Result<(), NatsError> {
        let mut data = Vec::new();
        if let Some(v) = s.req {
            data = v;
        }
        let sub_path = join_path(reg_server_queue(&s.server_type, &s.queue), s.suffix);
        let sub = format!("{}/{}", self.prefix, sub_path);
        if let Err(err) = self.client.publish(&sub, data) {
            return Err(NatsError::NatsPublishError(err.to_string()));
        }
        Ok(())
    }

    fn request(&self, s: ReqBuilder) -> Result<Vec<u8>, NatsError> {
        let mut data = Vec::new();
        if let Some(v) = s.req {
            data = v;
        }
        let server = s.server.unwrap();
        let sub_path = join_path(reg_server_item(&server), s.suffix);
        let sub = format!("{}/{}", self.prefix, sub_path);
        let resp = self
            .client
            .request_timeout(&sub, data, self.dial_timeout(s.dial_timeout));
        match resp {
            Ok(v) => Ok(v.data),
            Err(err) => Err(NatsError::NatsRequestError(err.to_string())),
        }
    }

    fn queue_request(&self, s: ReqBuilder) -> Result<Vec<u8>, NatsError> {
        let mut data = Vec::new();
        if let Some(v) = s.req {
            data = v;
        }
        let server = s.server.unwrap();
        let sub_path = join_path(reg_server_queue(&server.server_type, &s.queue), s.suffix);
        let sub = format!("{}/{}", self.prefix, sub_path);
        let resp = self
            .client
            .request_timeout(&sub, data, self.dial_timeout(s.dial_timeout));
        match resp {
            Ok(v) => Ok(v.data),
            Err(err) => Err(NatsError::NatsRequestError(err.to_string())),
        }
    }

    fn get_server(&self) -> Option<Server> {
        match &self.server {
            None => None,
            Some(s) => Some(s.clone()),
        }
    }

    fn find_server(
        &self,
        server_type: &str,
        arg: String,
        options: Option<Vec<FilterOption>>,
    ) -> Option<Server> {
        block_on(async { self.finder.get_user_server(server_type, arg, options).await })
    }
    fn remove_find_cache(&self, arg: String) {
        self.finder.remove_user_server(arg);
    }

    fn close(&self) -> Result<(), NatsError> {
        Ok(())
    }
}

fn deal_msg(msg: Message, callback: CallbackFunc, debug_msg: bool) {
    if debug_msg {
        info!("dealmsg,{:?}", &msg.data);
    }
    let resp = callback(&msg.data);
    if let Some(v) = resp {
        if let Err(err) = msg.respond(v) {
            error!("respond msg err:{}", err);
        }
    }
}
